/*
 *    Copyright 2022 The DSMS Authors.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package com.dsms.modules.cluster.service.impl;

import cn.hutool.core.net.NetUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dsms.common.constant.ClusterStatusEnum;
import com.dsms.common.constant.ResultCode;
import com.dsms.common.constant.SystemConst;
import com.dsms.common.exception.DsmsEngineException;
import com.dsms.common.remotecall.model.RemoteRequest;
import com.dsms.dfsbroker.auth.api.dto.AuthApi;
import com.dsms.dfsbroker.auth.model.AuthDTO;
import com.dsms.dfsbroker.auth.model.remote.AuthResponse;
import com.dsms.dfsbroker.cluster.api.ClusterApi;
import com.dsms.dfsbroker.cluster.model.Cluster;
import com.dsms.dfsbroker.cluster.model.remote.StatusResult;
import com.dsms.dfsbroker.cluster.service.IClusterService;
import com.dsms.modules.cluster.mapper.ClusterMapper;
import com.dsms.modules.util.RadosSingleton;
import com.dsms.modules.util.RemoteCallUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.security.core.AuthenticationException;
import org.springframework.stereotype.Service;
import springfox.documentation.annotations.Cacheable;

import java.io.IOException;
import java.net.Socket;
import java.util.List;

import static com.dsms.common.constant.SystemConst.CONNECT_TIMEOUT;

@Service
@Slf4j
public class ClusterServiceImpl extends ServiceImpl<ClusterMapper, Cluster> implements IClusterService {


    private final ClusterApi clusterApi;
    private final AuthApi authApi;

    @Autowired
    public ClusterServiceImpl (ClusterApi clusterApi, AuthApi authApi) {
        this.clusterApi = clusterApi;
        this.authApi = authApi;
    }

    @Override
    public Cluster about() {
        LambdaQueryWrapper<Cluster> queryWrapper = new QueryWrapper<Cluster>().lambda().eq(Cluster::getBindStatus, ClusterStatusEnum.BIND.getStatus());
        //get cluster related info
        List<Cluster> list = this.list(queryWrapper);
        if (list.isEmpty()) {
            throw new DsmsEngineException(ResultCode.CLUSTER_NOTBIND);
        }

        return list.get(0);
    }

    @Override
    public Cluster bind(Cluster cluster) {
        //1. Verify that a cluster is bound
        LambdaQueryWrapper<Cluster> queryWrapper = new QueryWrapper<Cluster>().lambda().eq(Cluster::getBindStatus, ClusterStatusEnum.BIND.getStatus());

        long count = this.count(queryWrapper);
        if (count > 0) {
            throw new DsmsEngineException(ResultCode.CLUSTER_ALREADYBIND);
        }
        //2. verify dsms-storage address connectivity
        String clusterAddress = cluster.getClusterAddress();
        boolean ping = NetUtil.ping(clusterAddress, CONNECT_TIMEOUT);
        if (!ping) {
            throw new DsmsEngineException(ResultCode.CLUSTER_ADDRESS_NET_ERROR);
        }
        //3. verify dsms-storage server port connectivity
        Integer clusterPort = cluster.getClusterPort();
        try (Socket socket = new Socket(clusterAddress, clusterPort)) {
            boolean connected = socket.isConnected();
        } catch (IOException e) {
            throw new DsmsEngineException(e, ResultCode.CLUSTER_SERVICEERROR);
        }
        //4. verify dsms-storage restful api key
        RemoteCallUtil.updateRemoteFixedParam(cluster);
        RemoteRequest remoteRequest = RemoteCallUtil.generateRemoteRequest();

        try {
            Cluster clusterInfo = clusterApi.getClusterVersion(remoteRequest);
            log.info(clusterInfo.toString());
        } catch (AuthenticationException e) {
            throw new DsmsEngineException(e, ResultCode.CLUSTER_KEY_ERROR);
        }
        //5. get dsms-storage admin key
        try {
            AuthResponse clusterAuthInfo = authApi.getAuthKey(RemoteCallUtil.generateRemoteRequest(), new AuthDTO(SystemConst.DSMS_STORAGE_AUTH_PREFIX + SystemConst.DSMS_STORAGE_USER));
            cluster.setAdminKey(clusterAuthInfo.getKey());
        } catch (Throwable e) {
            throw new DsmsEngineException(e, ResultCode.CLUSTER_ADMIN_KEY_ERROR);
        }
        //6. get dsms-storage fsid
        try {
            StatusResult status = clusterApi.getStatus(RemoteCallUtil.generateRemoteRequest());
            cluster.setFsid(status.getFsid());
        } catch (Throwable e) {
            throw new DsmsEngineException(e, ResultCode.CLUSTER_FSID_ERROR);
        }
        //6. insert cluster info to cluster table
        cluster.setBindStatus(ClusterStatusEnum.BIND.getStatus());
        this.save(cluster);

        return cluster;
    }

    @Override
    @CacheEvict("CurrentBindCluster")
    public boolean unbind() {
        LambdaQueryWrapper<Cluster> removeWrapper = Wrappers.<Cluster>lambdaQuery().eq(Cluster::getBindStatus, ClusterStatusEnum.BIND.getStatus());

        boolean isRemove = this.remove(removeWrapper);
        if (!isRemove) {
            throw new DsmsEngineException(ResultCode.CLUSTER_NOTBIND);
        }
        RemoteCallUtil.remoteFixedParam = null;

        //should shut down rados while unbind cluster,otherwise dsms-engine will not reacquire new rados objects
        RadosSingleton.INSTANCE.shutdownRados();

        return isRemove;
    }

    @Override
    @Cacheable("CurrentBindCluster")
    public Cluster getCurrentBindCluster() {
        LambdaQueryWrapper<Cluster> queryWrapper = new QueryWrapper<Cluster>().lambda().eq(Cluster::getBindStatus, ClusterStatusEnum.BIND.getStatus());

        List<Cluster> list = this.list(queryWrapper);
        if (list.isEmpty()) {
            throw new DsmsEngineException(ResultCode.CLUSTER_NOTBIND);
        }
        return list.get(0);
    }

}
